import java.util.concurrent.ConcurrentLinkedQueue;
import java.io.*;
import java.net.*;

/**
 *
 * Non-robust implementation of the sending end of a distributed queue
 * of objects of class E. The class E must be Serializable, as the
 * objects are moved using ObjectOutputStream and
 * ObjectInputStream. The receiver end is implemented in
 * ObjectQueueReceiverEnd.
 *
 * @author Jesper Buus Nielsen, Aarhus University, 2012.
 */

public class PointToPointQueueSenderEndNonRobust<E extends Serializable>
        extends Thread implements PointToPointQueueSenderEnd<E>  {

    /**
     * The address of the receiving end of the queue.
     */
    private InetSocketAddress receiverAddress;

    /**
     * The objects not yet delivered.
     */
    final private ConcurrentLinkedQueue<E> pendingObjects;

    /**
     * Used to signal that the queue should shut down.
     */
    private boolean shutdown = false;

    public PointToPointQueueSenderEndNonRobust() {
        this.pendingObjects = new ConcurrentLinkedQueue<E>();
    }

    /**
     *
     * @param serverAddress The IP address and defaultPort of the receiver end.
     */
    public void setReceiver(InetSocketAddress serverAddress) {
        if (this.receiverAddress==null && serverAddress!=null) {
            this.receiverAddress = serverAddress;
            this.start();
        }
    }

    /**
     * Puts a message in this queue. The call is asynchronous, i.e.,
     * it returns immediately. In particular, it returns before the
     * object is delivered at the receiver end. The manager of the
     * queue will later take care of moving the object to the
     * receiving side.
     *
     * @param object The message to be added to the queue.
     */
    public void put(E object) {
        assert (!shutdown) : "Cannot put after shutdown!";
        if (object == null) {
            throw new NullPointerException("Cannot send null's");
        }
        synchronized(pendingObjects) {
            boolean wasEmpty = pendingObjects.isEmpty();
            pendingObjects.add(object);
            if (wasEmpty) {
                // We wake up the manager if it waits for a new
                // message to send.
                pendingObjects.notify();
            }
        }
    }

    /**
     * Shuts down the queue. After this no more messages can be added
     * to the queue at the sending end. The queue will try to deliver
     * the objects already scheduled for delivery, but will terminate
     * on the first failed attempt. The safe way to use shutdown() is
     * therefore to call it only when no objects are pending!
     */
    public void shutdown() {
        synchronized (pendingObjects) {
            shutdown = true;
            pendingObjects.notifyAll();
        }
    }

    /**
     *
     * @return Whether all messages have been delivered.
     */
    public boolean isEmpty() {
        synchronized (pendingObjects) {
            return pendingObjects.isEmpty();
        }
    }

    /**
     * Takes the next pending message and tries to move it to the
     * queue of the receiving end. This is an extremely inefficient
     * implementation as it opens a new connection for each object to
     * be sent! This, however, is also a simple way to get some
     * rudimentary robustness: a dropped connection, or other
     * IOException, is handled simply by calling pushOneObject again.
     * Should only be called if there are objects to be sent.
     *
     * @return whether an object was transfered to the receiver end of
     *         the queue
     */
    private boolean pushOneObject() {
        if (receiverAddress==null) {
            return false;
        }
        E object = pendingObjects.peek();
        if (object == null) {
            return false;
        }
        Socket socket = null;
        ObjectOutputStream forSendingObjects = null;
        try {
            socket = new Socket(receiverAddress.getAddress(),
                    receiverAddress.getPort());
            forSendingObjects
                    = new ObjectOutputStream(socket.getOutputStream());
        } catch (UnknownHostException e) {
            log("Problems looking up " + receiverAddress + " ["+ e +"]");
            return false;
        } catch (IOException e) {
            log("Problems opening socket to " + receiverAddress
                    + " ["+ e +"]");
            return false;
        }
        try {
            forSendingObjects.writeObject(object);
        } catch (IOException e) {
            log("Could not push object [" + object + "]to host ["
                    + receiverAddress + "] because [" + e + "]");
            return false;
        } finally {
            try {
                forSendingObjects.close();
                socket.close();
            } catch (IOException e) {
                log(e);
            }
        }
        // When we make it here the object was pushed to the other
        // side, so we can remove it from the queue of pending pushes.

        pendingObjects.poll();
        return true;
    }

    /**
     * Internal method for waiting until one or more objects are
     * pending to be pushed. 
     */
    private void waitForObjectsToBePendingOrShutdown() {
        synchronized (pendingObjects) {
            while (pendingObjects.isEmpty() && !shutdown) {
                try {
                    // The put method will wake us up if messages
                    // arrive.  The shutdown method will wake us up if
                    // we are to shut down.
                    pendingObjects.wait();
                } catch (InterruptedException e) {
                    // Ignore. The while condition ensures proper
                    // behavior in case of interrupts.
                }
            }
            // Now objects are pending send or we are shutting down
        }
    }

    /**
     * @return The address of the receiver.
     */
    public InetSocketAddress getReceiverAddress() {
        return receiverAddress;
    }

    /**
     * Starts a thread which pushes objects in this queue to the receiver side.
     */
    public void run() {

        while (!shutdown) {
            waitForObjectsToBePendingOrShutdown();
            if (!shutdown) {
                //  We might have come out of
                //  waitForObjectsToBePendingOrShutdown() because of a
                //  shutdown. If not, then a message is ready to be
                //  sent.
                pushOneObject();
            }
        }

        // We are shutting down. Will try to send the remaining
        // messages.  However, on the first delivery error, I will
        // terminate. Here we go:
        boolean allOkSoFar = true;
        while (!pendingObjects.isEmpty() && allOkSoFar) {
            allOkSoFar = pushOneObject();
        }

        if (!pendingObjects.isEmpty()) {
            log("Warning: PointToPointQueueSendingEnd shutting down with"
                    + pendingObjects.size() + " pending messages.");
        }
    }

    /***
     ** HELPERS FOR DEBUGGING
     **/
    protected boolean log = false;

    public void printLog() {
        log = true;
    }

    protected void log(String msg) {
        if (log) System.out.println(this + " said: " + msg);
    }

    protected void log(Exception e) {
        if (log) System.out.println(this + " cast: " + e);
    }

}
